跳到主要内容

异步 IO asyncio

Python 并发和并行方案

在 Python 世界有 3 种并发和并行方案,如下:

  • 多线程 (threading)
  • 多进程 (multiprocessing)
  • 异步 IO (asyncio)

这些方案是为了解决不同特点的性能瓶颈。性能问题主要有 2 种:

  • CPU 密集型 (CPU-bound)。这也就是指计算密集型任务,它的特点事需要要进行大量的计算。例如 Python 内置对象的各种方法的执行,科学计算,视频转码等等。
  • I/O 密集型 (I/O-bound)。凡是涉及到网络、内存访问、磁盘 I/O 等的任务都是 IO 密集型任务,这类任务的特点是 CPU 消耗很少,任务的大部分时间都在等待 I/O 操作完成。例如数据库连接、Web 服务、文件读写等等。

这三个方案中对于 CPU 密集型的任务,优化方案只有一种,就是使用多进程充分利用多核 CPU 一起完成任务,达到提速的目的。而对于 I/O 密集型的任务,则这三种方案都可以。

asyncio 是什么?

asyncio 是 Python 用于编写单线程并发代码使用的库,使用 async/await 语法。它被用于编写、执行异步操作和协程,从而允许 IO 绑定和高级别的结构化网络代码,尤其是在高性能网络服务器和客户端应用中。以下是关于 asyncio 的一些关键点,以及如何与 async 和 await 关键字结合使用的例子。

基本概念

  • 事件循环:asyncio 的核心组件,用于执行异步操作和协程。事件循环负责管理和分发事件到不同的任务。
  • 任务:事件循环中的一个可等待对象,用于封装协程的执行。
  • 协程:通过 async def 定义的一个特殊函数,它是 asyncio 用于声明异步操作的方法。
  • async 用于声明一个协程。这样的函数调用不会立即执行;相反,它会返回一个协程对象,这个对象可以被 await、封装成任务或者通过其他方式调度。
  • await 用于等待一个可等待对象(如协程或任务),并且暂停当前协程的执行,直到等待的对象完成。这允许其他协程运行。

asyncio.run 的原理

asyncio.run(coro, *, debug=False) 是 Python 3.7+ 引入的高级函数,用于执行一个异步程序。

它接受一个协程 coro,创建一个新的事件循环,运行传入的协程,直到完成,然后关闭事件循环。如果在调用 asyncio.run() 之前事件循环已经在运行,它会抛出一个异常。这个函数主要是为了简化异步程序的启动。

import asyncio

async def main():
print('Hello')
await asyncio.sleep(1)
print('world')

# Python 3.7+
asyncio.run(main())

基本异步函数使用

同时运行多个协程

import asyncio

async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)

async def main():
task1 = asyncio.create_task(say_after(1, 'hello'))
task2 = asyncio.create_task(say_after(2, 'world'))

# 等待两个任务完成
await task1
await task2

asyncio.run(main())

结合使用 asyncio.gather

  • asyncio.gather 用于并发运行多个可等待对象。
import asyncio

async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)

async def main():
await asyncio.gather(
say_after(1, 'hello'),
say_after(2, 'world'),
)

asyncio.run(main())

在非异步函数中执行异步函数

有几种方法可以在非异步函数中执行异步函数,但是最常见和推荐的方式是使用 asyncio.run()

import asyncio

async def async_func():
print("异步函数开始")
await asyncio.sleep(1)
print("异步函数结束")

def normal_func():
asyncio.run(async_func())

normal_func()

如果你已经在一个 asyncio.run(main()) 调用的上下文中并且想要从非异步函数中执行异步代码,你可以使用 asyncio.create_task() 或者 asyncio.get_event_loop().run_until_complete()

死锁的问题

使用 asyncio.create_task()asyncio.get_event_loop().run_until_complete() 在某些情况下造成死锁,主要是因为这些操作可能会导致事件循环的不当管理或使用,尤其是当它们被错误地嵌套使用时。下面是一些可能导致死锁的场景和原因:

1. 错误的嵌套调用

在已经运行的事件循环中尝试使用 asyncio.run()event_loop.run_until_complete() 启动另一个事件循环是不被允许的。asyncio.run() 旨在作为进入点,它会创建一个新的事件循环并在结束时关闭它。如果在这个事件循环内部再次调用 asyncio.run(),Python 会抛出异常,因为它尝试创建一个新的事件循环而当前已经有一个运行中的事件循环。

2. 不适当的阻塞调用

  • 使用 asyncio.get_event_loop().run_until_complete() 在异步函数中等待另一个异步函数完成,如果这个调用是在同一个线程和事件循环中发生的,可能会导致死锁。因为 run_until_complete() 会阻塞调用它的线程直到协程完成,但如果事件循环正在等待当前协程让出控制(通过 await),这将永远不会发生。
  • 类似地,如果在一个正在运行的事件循环中使用 asyncio.create_task() 创建任务,但没有适当地管理这些任务(比如,通过 await 等待它们完成),可能会导致应用逻辑上的死锁,尽管这不会阻塞事件循环本身。

3. 资源竞争和锁

如果使用 asyncio.Lock 或其他同步原语不当,也可能导致死锁。例如,如果一个协程获取了一个锁并在释放锁之前等待另一个需要相同锁的协程完成,这将导致死锁。

如何避免死锁

  • 避免在已经运行的事件循环内部启动新的事件循环。
  • 使用 await 直接等待异步函数或任务完成,而不是尝试在一个异步环境中同步等待它们。
  • 当使用同步原语(如锁)时,确保逻辑上不会导致相互等待的情况。
  • 在设计异步应用时,确保任务之间的依赖关系清晰,避免相互阻塞。

正确管理异步任务和事件循环是编写有效且无死锁异步代码的关键。理解和遵循 asyncio 的设计原则可以帮助开发者避免这些常见的陷阱。

await() 函数

__await__() 方法是 Python 异步编程中的一个特殊方法,它使得一个对象成为可等待的。这意味着对象可以在 async 函数中被 await 关键字使用。当 await 遇到一个对象时,它会查找该对象的 __await__() 方法,该方法需要返回一个迭代器。然后,事件循环负责从这个迭代器中获取值,直到迭代器耗尽。

__await__() 方法主要用于创建兼容 asyncio 事件循环的自定义异步对象。通过实现这个方法,你可以定义对象的异步操作,使其在 await 表达式中可用。这对于创建异步的库或框架特别有用,可以提供自定义的异步操作。

下面是一个实现 __await__() 方法的简单例子,演示如何使一个普通类成为可在 async 函数中 await 的对象:

class AsyncAdd:
def __init__(self, a, b):
self.a = a
self.b = b

def __await__(self):
# 这个生成器函数模拟异步操作
yield from asyncio.sleep(1).__await__()
return self.a + self.b

async def main():
result = await AsyncAdd(1, 2)
print(result)

# 运行主函数
import asyncio
asyncio.run(main())

在这个例子中,AsyncAdd 类通过定义 __await__() 方法成为了一个可等待对象。当 await AsyncAdd(1, 2) 被调用时,它实际上会等待 asyncio.sleep(1) 完成,然后返回两个数字的和。这里的关键点是 __await__() 方法返回了一个迭代器,该迭代器由 yield from 语句产生,用来暂停当前协程,直到 asyncio.sleep(1) 完成。

注意事项:

  • __await__() 方法必须返回一个迭代器,通常是通过生成器实现。
  • 它允许类的实例被用在 await 表达式中,但该类的实例本身不是协程。它仅仅是异步操作的一种包装。
  • 在设计自定义的异步操作时,确保遵循异步编程的最佳实践,避免阻塞操作,以免破坏事件循环的非阻塞性质。

通过 __await__() 方法,Python 提供了一种灵活的机制来支持异步编程,使得开发者可以创建兼容异步操作的自定义对象。